[ 비고 ]
정상인지 아닌지 알수가 없다.
그냥 잘 된거 같아서 정상으로 판단 하고 있는데 제발 누가 제대로 이거 한건지 검토좀 해줬으면 좋겠다.
-------------------------------------------------------------------------------------------------------------------------------------------------------
우선 진짜 일주일동한 너무 삽질을 많이 해서, broker/zookeeper 구조부터 바꾸고 시작했다.
기존에 계속 테스트를 했던 구조는 한 서버에 broker와 zookeeper를 같이 쓰고 있고,
kafka 엔진에서 제공하는 zookeeper.properties 로 zookeeper를 띄웠다.
** apache zookeeper 로 따로 구성해서 쓴 것도 있었지만, 구축 테스를 거듭 하면서 kafka 패키지 안에 있는 것으로 변경해서 했다.
*** kafka 패키지 안에 있다고 꼭 broker와 같이 써야하냐?
--> ㄴㄴㄴ. apache zookeeper를 받든 kafka engine 다운받고 그 안에 있는 zookeeper 쓰든 맘대로 구축하면 된다.
방화벽과 broker 연결에 이슈만 없으면 크게 상관 없단 이야기다.
===========================================================
그냥 kraft모드로 "새로 설치" 해서 테스트 하는건 아무 이슈가 없었다.
저렇게 동일하게 써도 아무 이상 없었다는것이다. (broker/controller 1set 씩 3EA)
===========================================================
확인이 필요했던 건 "현재 가지고 있는 데이터를 그대로 이관 할 수 있냐" 였다.
물론 방법은 있었다.
gpt, confluent, apache kafka 공식문서 확인 등등 난리를 쳤었는데 잘안됐다.
결론을 내린건 내가 표준형상이랍시고 구성했던 kafka 형상의 이슈였던 것 같다.
하지만 현재 이 구성으로 테스트 한 것도 정상인지 의문이 드는 부분이 있었는데 아래 따로 정리.
근데 사실 apache 공식문서는 must use 3.9였다.ㅋㅋ
그리고 back to zookeeper도 불가라고 써져있는 것 같다.
https://kafka.apache.org/documentation/#kraft_zk_migration
Preparing for migrationBefore beginning the migration, the Kafka brokers must be upgraded to software version 3.9.0 and have the "inter.broker.protocol.version" configuration set to "3.9".---
|
============================================================
[ 환경 ]
리눅스 (rocky) // gcp 활용
egress/ingress 귀찮아 그냥 다 같은 섭넷에 vm을 만들었다.
그리고 테스트를 위해 기존에 구축했던 broker/zookeeper 구성을 아래와 같이 만들었다.
참고로 zookeeper1/controller가 배치될 서버 호스트명이 broker3 이다.
zookeeper 바꾸면서 있었던 헤프닝은 여기.
[ kafka version ]
3.8.1 로 했다. confluence kafka 아니고 일반 Apache kafka 다.
https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
[ 순서 및 설정 방법 ]
1. 원본 형상으로 기동 및 서비스 정상 확인
서비스 기동 후 broker1,2가 broker3의 2181 포트(zookeeper 포트)에 접속 되어 있는지 확인.
zktopic을 replication 2, partiton 2로 생성하고 data produce/consume 진행.
broker1, 2 서버에서만 "broker" 역할을 하게 했고, broker3 서버를 zookeeper로 만들어서,
log data는 broker1,2 에만 정확하게 쌓이는지 확인 완료.
consume offset도 log data 적재 여부 확인 완료.
2. controller.properties 파일 수정
kraft 모드에서는 server.properties를 통해 broker/controller 역할을 둘다 할 수도 있지만,
broker.properties와 controller.properties 두개로 나눠서 할 수도 있다.
- 원본 파일 위치 : /kafka/kafka_2.13-3.8.1/config/kraft
** 참고로 나는 회사에서 내가 지정해버린 구성안으로 되어있어 저 파일을 별도 위치로 구성했다.
수정 내용은 apache kafka 공식문서 활용했고, 빠른 이해를 돕기 위해 서버 내용 통으로 가져옴.
수정한 내용은 서버에 맞게 수정해야 하는 것도 있고, migration 을 위해 넣어야 하는 것도 있으니 참고.
### 수정 내용
controller.properties 원본 | 수정 내용 |
[kafka@broker3 kraft]$ cat controller.properties | grep -v ^# | grep -v ^$ process.roles=controller node.id=1 controller.quorum.voters=1@localhost:9093 listeners=CONTROLLER://:9093 controller.listener.names=CONTROLLER num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kraft-controller-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 |
[kafka@broker3 configuration]$ cat controller.properties | grep -v ^# | grep -v ^$ process.roles=controller node.id=3 zookeeper.metadata.migration.enable=true zookeeper.connect=broker3:2181/zoo_data controller.quorum.voters=3@broker3:9093 listeners=CONTROLLER://:9093 controller.listener.names=CONTROLLER num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kaf_data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 inter.broker.listener.name=PLAINTEXT |
※ 기타 offsets.topic.repication.factor 같은 경우는
controller 에서 사용하는 __cluster_metadata-0 이란 토픽에 대해 다중 kraft controller 복제로 판단 된다.
현재는 controller가 1대라 우선 그냥 두었다.
3. formatting
zookeeper 가 띄워져 있는 서버에서 진행 했다. (그게 당연한거..같아서?)
--> broker3번 서버에서만 진행 했단 소리
필요한건 cluster id 인데,
cluster id는 broker1,2 의 데이터 디렉토리 설정한 곳에 가면 (나는 /kaf_data)
meta.properties << 파일이 있다. cat으로 열어서 확인 할 수 있다. (directory id와 헷갈리지 말 것)
cluster id는 지정된 log dir에 대해 연결된 서버가 전부 동일하다.
[kafka@broker1 kaf_data]$ cat meta.properties # #Thu Dec 05 14:51:50 KST 2024 broker.id=1 directory.id=AXbyX11PVRlgLMFpiWR94g version=0 cluster.id=l6QUPjHdTA-QQhQtZiMv5g |
** 아래 색칠해진 경로는 사내 내부 지정 경로
# /kafka/kafka_2.13-3.8.1/bin/kafka-storage.sh format -t cluster id -c /kafka/app/configuration/controller.properties
여기서 진짜 애먹었는데, 기존에 broker로 3대 쓸 때 .lock 파일 이슈(data dir)와 log directory already use, 기타등등 있었다.
영어 해석이 잘 안돼서 공식문서 제대로 못 읽은 탓이었을지도 모르나,
구조 바꾸고 나서 broker3번 서버에 zookeeper만 있고, log dir이 싹 비어있는 상태이므로 깔끔하게 format 했다.
4. controller.properties 기동
기동은 kafka-server-start.sh 파일 통해서 기동 했다.
log dir 지정은 그냥 kafka log 떨구는 형상 그대로 해뒀다.
** 아래 색칠해진 경로는 사내 내부 지정 경로
# kafka-server-start.sh -daemon /kafka/app/configuration/controller.properties
5. 로그 확인
로그를 broker 떨구는데다 매핑해놔서, kafkaServer.out 과 controller.log 두개를 봤다.
Completed migration~을 봤다. ㅠㅠ
controller.log |
[2024-12-05 14:49:47,103] INFO [QuorumController id=3] Replayed a ZkMigrationStateRecord changing the migration state from NONE to PRE_MIGRATION. (org.apache.kafka.controller.FeatureControlManager) [2024-12-05 14:51:52,418] INFO [QuorumController id=3] Starting migration of ZooKeeper metadata to KRaft. (org.apache.kafka.controller.QuorumController) [2024-12-05 14:51:52,622] INFO [QuorumController id=3] Completing migration of ZooKeeper metadata to KRaft. (org.apache.kafka.controller.QuorumController) [2024-12-05 14:51:52,622] INFO [QuorumController id=3] Replayed a ZkMigrationStateRecord changing the migration state from PRE_MIGRATION to MIGRATION. (org.apache.kafka.controller.FeatureControlManager) ~~~ |
server.log |
[2024-12-05 14:51:52,651] INFO [KRaftMigrationDriver id=3] Completed migration of metadata from ZooKeeper to KRaft. 57 records were generated in 230 ms across 1 batches. The average time spent waiting on a batch was 44.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=52, CONFIG_RECORD=3}. The current metadata offset is now 319 with an epoch of 1. Saw 2 brokers in the migrated metadata [1, 2]. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,659] INFO [KRaftMigrationDriver id=3] Finished initial migration of ZK metadata to KRaft. Transitioned migration state from ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=1, kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=1733377787568, migrationZkVersion=0, controllerZkEpoch=8, controllerZkVersion=8} to ZkMigrationLeadershipState{kraftControllerId=3, kraftControllerEpoch=1, kraftMetadataOffset=319, kraftMetadataEpoch=1, lastUpdatedTimeMs=1733377787568, migrationZkVersion=1, controllerZkEpoch=8, controllerZkVersion=8} (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,659] INFO [KRaftMigrationDriver id=3] 3 transitioning from ZK_MIGRATION to SYNC_KRAFT_TO_ZK state (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,659] INFO [KRaftMigrationDriver id=3] Expected driver state ZK_MIGRATION but found SYNC_KRAFT_TO_ZK. Not running this event MigrateMetadataEvent. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,659] INFO [KRaftMigrationDriver id=3] Performing a full metadata sync from KRaft to ZK. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,728] INFO [KRaftMigrationDriver id=3] Made the following ZK writes when reconciling with KRaft state: {} (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,728] INFO [KRaftMigrationDriver id=3] 3 transitioning from SYNC_KRAFT_TO_ZK to KRAFT_CONTROLLER_TO_BROKER_COMM state (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,728] INFO [KRaftMigrationDriver id=3] Expected driver state SYNC_KRAFT_TO_ZK but found KRAFT_CONTROLLER_TO_BROKER_COMM. Not running this event SyncKRaftMetadataEvent. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,733] INFO [KRaftMigrationDriver id=3] Sending RPCs to broker before moving to dual-write mode using at offset and epoch OffsetAndEpoch(offset=320, epoch=1) (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,827] INFO [KRaftMigrationDriver id=3] 3 transitioning from KRAFT_CONTROLLER_TO_BROKER_COMM to DUAL_WRITE state (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:51:52,827] INFO [KRaftMigrationDriver id=3] Expected driver state KRAFT_CONTROLLER_TO_BROKER_COMM but found DUAL_WRITE. Not running this event SendRPCsToBrokersEvent. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) [2024-12-05 14:59:47,931] INFO [NodeToControllerChannelManager id=3 name=registration] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient) |
6. broker도 변경
우선 기존 broker에도 zookeeper migration mode를 넣어야 한다는 내용이 있었다.
공식 문서 중 " Here is a sample config for a broker that is ready for migration: " 라는 부분 참고.
그 중 Set the IBP는 하지 않음 :: 왜냐 나는 3.8.1이니까.
node.id 는 설명에 없었는데, 기존 테스트 시 kraft 모드로 broker가 못찾는것 같은 에러가 있어 별도로 추가.
### 수정 내용
server.properties 기존 파일 | 수정 내용 |
broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://broker1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kaf_data num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=240 log.retention.bytes=3221225472 log.segment.bytes=100663296 log.retention.check.interval.ms=300000 zookeeper.connect=broker3:2181/zoo_data zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://broker1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kaf_data num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=240 log.retention.bytes=3221225472 log.segment.bytes=100663296 log.retention.check.interval.ms=300000 zookeeper.connect=broker3:2181/zoo_data zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 node.id=1 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT zookeeper.metadata.migration.enable=true controller.quorum.voters=3@broker3:9093 controller.listener.names=CONTROLLER |
그리고 broker1, 2 서버에 띄워져 있는 broker를 재기동 했다.
정상으로 서비스가 띄워졌고, 추가로 zktopic 에 데이터 producer/consumer 수행 했을 때 데이터도 특이사항 없이 잘 연동 됐다.
(log 없음 쏘리)
7. controller 와 broker를 kraft 모드로 전환
zookeeper를 끊어내기 위한 작업을 시도한다.
controller.properties에 있는 zookeeper migration 옵션 관련 아래 2개를 주석처리 하고,
server.properties는 kraft 전용으로 수정 후 기동한다.
server.properties는 기존의 /kafka/kafka_2.13-3.8.1/config/kraft 에 있는 broker.properties 파일을, server.properties 파일로 이름만 바꿨다.
참고로 저 위치에 server.properties도 있고, 해당 파일은 broker/controller 동시에 활용할 수 있도록 만들어 둔 파일이니 헷갈리지 말았으면 한다.
내 편의를 위해 server.properties로 덮어 씌운거다...
anyway broker.properties든 server.properties 든 broker 역할을 하는건 2개기 때문에 각 서버에서 node.id 랑 잘 맞춰서 생성해주면 된다. (위에 controller 설명때 부터 보면 알겠지만, node.id 는 broker.id와 나는 동일하게 했다.)
controller.properties | server.properties (원래는 broker.properties) |
[kafka@broker3 configuration]$ cat controller.properties | grep -v ^# | grep -v ^$ process.roles=controller node.id=3 #zookeeper.metadata.migration.enable=true #zookeeper.connect=broker3:2181/zoo_data controller.quorum.voters=3@broker3:9093 listeners=CONTROLLER://:9093 controller.listener.names=CONTROLLER num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kaf_data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 inter.broker.listener.name=PLAINTEXT |
[kafka@broker1 configuration]$ cat server.properties | grep -v ^# | grep -v ^$ process.roles=broker node.id=1 controller.quorum.voters=3@broker3:9093 listeners=PLAINTEXT://broker1:9092 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://broker1:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kaf_data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 |
재빠르게 controller.properties 를 재기동 하고, 그 이후 broker 를 재기동 했다.
8. topic 데이터 전송 테스트
잘 됨. 잘 된거같음. 끝시마이.
1) 기존 zktopic 테스트 : 정상
producer | date : Thu Dec 5 15:30:43 KST 2024 [ topic status ] Topic: zktopic TopicId: ziP40aoWQySpjGCTr7U2uw PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: zktopic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr: Topic: zktopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1 Elr: LastKnownElr: ------------------------- [ topic partition logfile size changed ] -rw-rw-r-- 1 kafka kafka 1842 Dec 5 15:30 /kaf_data/zktopic-0/00000000000000000000.log -rw-rw-r-- 1 kafka kafka 1764 Dec 5 15:30 /kaf_data/zktopic-1/00000000000000000000.log ------------------------- [ topic partition offset ] zktopic 0 153 zktopic 1 147 |
consumer | date : Thu Dec 5 15:32:22 KST 2024 [ consumer group describe ] GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID zktopic-group zktopic 0 153 153 0 consumer-zktopic-group-1-b0d30229-1c79-444a-b5e4-390e39479641 /10.128.0.2 consumer-zktopic-group-1 zktopic-group zktopic 1 147 147 0 consumer-zktopic-group-1-b0d30229-1c79-444a-b5e4-390e39479641 /10.128.0.2 consumer-zktopic-group-1 -------------------------- [ consumer group state ] [2024-12-05 15:32:25,672] WARN [AdminClient clientId=adminclient-1] Connection to node -3 (broker3/10.128.0.4:9092) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient) GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS zktopic-group broker2:9092 (2) range Stable 1 -------------------------- [ consume offset status ] __consumer_offsets 8 73 -------------------------- [kafka@broker2 script]$ |
2) 신규 kratopic 생성/데이터전송 테스트 : 정상
kratopic : partition 4, replica 2
producer | date : Thu Dec 5 15:43:36 KST 2024 [ topic status ] [2024-12-05 15:43:37,624] WARN [AdminClient clientId=adminclient-1] Connection to node -3 (broker3/10.128.0.4:9092) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient) Topic: kratopic TopicId: a1mCTkTtSy-RZc24FPIcBw PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: kratopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr: Topic: kratopic Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr: Topic: kratopic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr: Topic: kratopic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: LastKnownElr: ------------------------- [ topic partition logfile size changed ] -rw-rw-r-- 1 kafka kafka 1194 Dec 5 15:43 /kaf_data/kratopic-0/00000000000000000000.log -rw-rw-r-- 1 kafka kafka 738 Dec 5 15:43 /kaf_data/kratopic-1/00000000000000000000.log -rw-rw-r-- 1 kafka kafka 831 Dec 5 15:43 /kaf_data/kratopic-2/00000000000000000000.log -rw-rw-r-- 1 kafka kafka 1209 Dec 5 15:43 /kaf_data/kratopic-3/00000000000000000000.log ------------------------- [ topic partition offset ] kratopic 0 93 kratopic 1 51 kratopic 2 60 kratopic 3 96 |
consumer | date : Thu Dec 5 15:44:08 KST 2024 [ consumer group describe ] GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID kratopic-group kratopic 3 96 96 0 consumer-kratopic-group-1-f2007faa-a10b-434c-9798-a2d872005199 /10.128.0.2 consumer-kratopic-group-1 kratopic-group kratopic 1 51 51 0 consumer-kratopic-group-1-f2007faa-a10b-434c-9798-a2d872005199 /10.128.0.2 consumer-kratopic-group-1 kratopic-group kratopic 2 60 60 0 consumer-kratopic-group-1-f2007faa-a10b-434c-9798-a2d872005199 /10.128.0.2 consumer-kratopic-group-1 kratopic-group kratopic 0 93 93 0 consumer-kratopic-group-1-f2007faa-a10b-434c-9798-a2d872005199 /10.128.0.2 consumer-kratopic-group-1 -------------------------- [ consumer group state ] [2024-12-05 15:44:12,448] WARN [AdminClient clientId=adminclient-1] Connection to node -3 (broker3/10.128.0.4:9092) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient) GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS kratopic-group broker1:9092 (1) range Stable 1 -------------------------- [ consume offset status ] __consumer_offsets 31 9 __consumer_offsets 8 82 |
[ 의문점 ]
1. cluster id를 formatting 할 때, 왜 log.dir은 비어있어야 하는가
+ broker 의 log dir 로는 formatting을 하지 않았음에도 불구하고 kraft로 정상 전환이 된 이유가 뭘까.
** 안비어있음 .lock 파일 이슈 또는 already use 등으로 정상적으로 안된다.
*** 포맷 옵션 중 ignore 쓰라는거 있지만 택도없지.
**** 물론 meta.properties 파일을 삭제하고도 해봤지만 제대로 안됨.
'업무 > kafka zookeeper' 카테고리의 다른 글
[ERROR 기록] zookeeper 기동 관련(1) (0) | 2024.12.05 |
---|---|
[작성 예정] AWS kafka 구성 (0) | 2024.09.13 |
apache kafka :: kraft (0) | 2024.09.12 |
[etc] Kafdrop (0) | 2024.07.16 |
[kafka][zookeeper] min.insync.replicas (0) | 2024.04.02 |